/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
 * This file defines various csi volume test drivers for TestSuites.
 *
 * There are two ways, how to prepare test drivers:
 * 1) With containerized server (NFS, Ceph, Gluster, iSCSI, ...)
 * It creates a server pod which defines one volume for the tests.
 * These tests work only when privileged containers are allowed, exporting
 * various filesystems (NFS, GlusterFS, ...) usually needs some mounting or
 * other privileged magic in the server pod.
 *
 * Note that the server containers are for testing purposes only and should not
 * be used in production.
 *
 * 2) With server or cloud provider outside of Kubernetes (Cinder, GCE, AWS, Azure, ...)
 * Appropriate server or cloud provider must exist somewhere outside
 * the tested Kubernetes cluster. CreateVolume will create a new volume to be
 * used in the TestSuites for inlineVolume or DynamicPV tests.
 */

package drivers

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/onsi/ginkgo"
	spb "google.golang.org/genproto/googleapis/rpc/status"
	"google.golang.org/grpc/codes"
	grpcstatus "google.golang.org/grpc/status"

	appsv1 "k8s.io/api/apps/v1"
	v1 "k8s.io/api/core/v1"
	rbacv1 "k8s.io/api/rbac/v1"
	storagev1 "k8s.io/api/storage/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/util/sets"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/klog/v2"
	"k8s.io/kubernetes/test/e2e/framework"
	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
	mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver"
	mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
	"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
	"k8s.io/kubernetes/test/e2e/storage/utils"

	"google.golang.org/grpc"
)

const (
	// GCEPDCSIDriverName is the name of GCE Persistent Disk CSI driver
	GCEPDCSIDriverName = "pd.csi.storage.gke.io"
	// GCEPDCSIZoneTopologyKey is the key of GCE Persistent Disk CSI zone topology
	GCEPDCSIZoneTopologyKey = "topology.gke.io/zone"

	// Prefix of the mock driver grpc log
	grpcCallPrefix = "gRPCCall:"
)

// hostpathCSI
type hostpathCSIDriver struct {
	driverInfo       storageframework.DriverInfo
	manifests        []string
	volumeAttributes []map[string]string
}

func initHostPathCSIDriver(name string, capabilities map[storageframework.Capability]bool, volumeAttributes []map[string]string, manifests ...string) storageframework.TestDriver {
	return &hostpathCSIDriver{
		driverInfo: storageframework.DriverInfo{
			Name:        name,
			FeatureTag:  "",
			MaxFileSize: storageframework.FileSizeMedium,
			SupportedFsType: sets.NewString(
				"", // Default fsType
			),
			SupportedSizeRange: e2evolume.SizeRange{
				Min: "1Mi",
			},
			Capabilities: capabilities,
			StressTestOptions: &storageframework.StressTestOptions{
				NumPods:     10,
				NumRestarts: 10,
			},
			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
				NumPods:      10,
				NumSnapshots: 10,
			},
			PerformanceTestOptions: &storageframework.PerformanceTestOptions{
				ProvisioningOptions: &storageframework.PerformanceTestProvisioningOptions{
					VolumeSize: "1Mi",
					Count:      300,
					// Volume provisioning metrics are compared to a high baseline.
					// Failure to pass would suggest a performance regression.
					ExpectedMetrics: &storageframework.Metrics{
						AvgLatency: 2 * time.Minute,
						Throughput: 0.5,
					},
				},
			},
		},
		manifests:        manifests,
		volumeAttributes: volumeAttributes,
	}
}

var _ storageframework.TestDriver = &hostpathCSIDriver{}
var _ storageframework.DynamicPVTestDriver = &hostpathCSIDriver{}
var _ storageframework.SnapshottableTestDriver = &hostpathCSIDriver{}
var _ storageframework.EphemeralTestDriver = &hostpathCSIDriver{}

// InitHostPathCSIDriver returns hostpathCSIDriver that implements TestDriver interface
func InitHostPathCSIDriver() storageframework.TestDriver {
	capabilities := map[storageframework.Capability]bool{
		storageframework.CapPersistence:         true,
		storageframework.CapSnapshotDataSource:  true,
		storageframework.CapMultiPODs:           true,
		storageframework.CapBlock:               true,
		storageframework.CapPVCDataSource:       true,
		storageframework.CapControllerExpansion: true,
		storageframework.CapOnlineExpansion:     true,
		storageframework.CapSingleNodeVolume:    true,

		// This is needed for the
		// testsuites/volumelimits.go `should support volume limits`
		// test. --maxvolumespernode=10 gets
		// added when patching the deployment.
		storageframework.CapVolumeLimits: true,
	}
	return initHostPathCSIDriver("csi-hostpath",
		capabilities,
		// Volume attributes don't matter, but we have to provide at least one map.
		[]map[string]string{
			{"foo": "bar"},
		},
		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
		"test/e2e/testing-manifests/storage-csi/external-health-monitor/external-health-monitor-controller/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-driverinfo.yaml",
		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-plugin.yaml",
		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/e2e-test-rbac.yaml",
	)
}

func (h *hostpathCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
	return &h.driverInfo
}

func (h *hostpathCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
	if pattern.VolType == storageframework.CSIInlineVolume && len(h.volumeAttributes) == 0 {
		e2eskipper.Skipf("%s has no volume attributes defined, doesn't support ephemeral inline volumes", h.driverInfo.Name)
	}
}

func (h *hostpathCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
	provisioner := config.GetUniqueDriverName()
	parameters := map[string]string{}
	ns := config.Framework.Namespace.Name

	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
}

func (h *hostpathCSIDriver) GetVolume(config *storageframework.PerTestConfig, volumeNumber int) (map[string]string, bool, bool) {
	return h.volumeAttributes[volumeNumber%len(h.volumeAttributes)], false /* not shared */, false /* read-write */
}

func (h *hostpathCSIDriver) GetCSIDriverName(config *storageframework.PerTestConfig) string {
	return config.GetUniqueDriverName()
}

func (h *hostpathCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
	snapshotter := config.GetUniqueDriverName()
	ns := config.Framework.Namespace.Name

	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
}

func (h *hostpathCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
	// Create secondary namespace which will be used for creating driver
	driverNamespace := utils.CreateDriverNamespace(f)
	driverns := driverNamespace.Name
	testns := f.Namespace.Name

	ginkgo.By(fmt.Sprintf("deploying %s driver", h.driverInfo.Name))
	cancelLogging := utils.StartPodLogs(f, driverNamespace)
	cs := f.ClientSet

	// The hostpath CSI driver only works when everything runs on the same node.
	node, err := e2enode.GetRandomReadySchedulableNode(cs)
	framework.ExpectNoError(err)
	config := &storageframework.PerTestConfig{
		Driver:              h,
		Prefix:              "hostpath",
		Framework:           f,
		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
		DriverNamespace:     driverNamespace,
	}

	o := utils.PatchCSIOptions{
		OldDriverName:       h.driverInfo.Name,
		NewDriverName:       config.GetUniqueDriverName(),
		DriverContainerName: "hostpath",
		DriverContainerArguments: []string{"--drivername=" + config.GetUniqueDriverName(),
			// This is needed for the
			// testsuites/volumelimits.go `should support volume limits`
			// test.
			"--maxvolumespernode=10",
			// Enable volume lifecycle checks, to report failure if
			// the volume is not unpublished / unstaged correctly.
			"--check-volume-lifecycle=true",
		},
		ProvisionerContainerName: "csi-provisioner",
		SnapshotterContainerName: "csi-snapshotter",
		NodeName:                 node.Name,
	}
	cleanup, err := utils.CreateFromManifests(config.Framework, driverNamespace, func(item interface{}) error {
		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
			return err
		}

		// Remove csi-external-health-monitor-agent and
		// csi-external-health-monitor-controller
		// containers. The agent is obsolete.
		// The controller is not needed for any of the
		// tests and is causing too much overhead when
		// running in a large cluster (see
		// https://github.com/kubernetes/kubernetes/issues/102452#issuecomment-856991009).
		switch item := item.(type) {
		case *appsv1.StatefulSet:
			var containers []v1.Container
			for _, container := range item.Spec.Template.Spec.Containers {
				switch container.Name {
				case "csi-external-health-monitor-agent", "csi-external-health-monitor-controller":
					// Remove these containers.
				default:
					// Keep the others.
					containers = append(containers, container)
				}
			}
			item.Spec.Template.Spec.Containers = containers
		}
		return nil
	}, h.manifests...)

	if err != nil {
		framework.Failf("deploying %s driver: %v", h.driverInfo.Name, err)
	}

	cleanupFunc := generateDriverCleanupFunc(
		f,
		h.driverInfo.Name,
		testns,
		driverns,
		cleanup,
		cancelLogging)

	return config, cleanupFunc
}

// mockCSI
type mockCSIDriver struct {
	driverInfo          storageframework.DriverInfo
	manifests           []string
	podInfo             *bool
	storageCapacity     *bool
	attachable          bool
	attachLimit         int
	enableTopology      bool
	enableNodeExpansion bool
	hooks               Hooks
	tokenRequests       []storagev1.TokenRequest
	requiresRepublish   *bool
	fsGroupPolicy       *storagev1.FSGroupPolicy
	embedded            bool
	calls               MockCSICalls
	embeddedCSIDriver   *mockdriver.CSIDriver

	// Additional values set during PrepareTest
	clientSet       kubernetes.Interface
	driverNamespace *v1.Namespace
}

// Hooks to be run to execute while handling gRPC calls.
//
// At the moment, only generic pre- and post-function call
// hooks are implemented. Those hooks can cast the request and
// response values if needed. More hooks inside specific
// functions could be added if needed.
type Hooks struct {
	// Pre is called before invoking the mock driver's implementation of a method.
	// If either a non-nil reply or error are returned, then those are returned to the caller.
	Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error)

	// Post is called after invoking the mock driver's implementation of a method.
	// What it returns is used as actual result.
	Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error)
}

// MockCSITestDriver provides additional functions specific to the CSI mock driver.
type MockCSITestDriver interface {
	storageframework.DynamicPVTestDriver

	// GetCalls returns all currently observed gRPC calls. Only valid
	// after PrepareTest.
	GetCalls() ([]MockCSICall, error)
}

// CSIMockDriverOpts defines options used for csi driver
type CSIMockDriverOpts struct {
	RegisterDriver      bool
	DisableAttach       bool
	PodInfo             *bool
	StorageCapacity     *bool
	AttachLimit         int
	EnableTopology      bool
	EnableResizing      bool
	EnableNodeExpansion bool
	EnableSnapshot      bool
	TokenRequests       []storagev1.TokenRequest
	RequiresRepublish   *bool
	FSGroupPolicy       *storagev1.FSGroupPolicy

	// Embedded defines whether the CSI mock driver runs
	// inside the cluster (false, the default) or just a proxy
	// runs inside the cluster and all gRPC calls are handled
	// inside the e2e.test binary.
	Embedded bool

	// Hooks that will be called if (and only if!) the embedded
	// mock driver is used. Beware that hooks are invoked
	// asynchronously in different goroutines.
	Hooks Hooks
}

// Dummy structure that parses just volume_attributes and error code out of logged CSI call
type MockCSICall struct {
	json string // full log entry

	Method  string
	Request struct {
		VolumeContext map[string]string `json:"volume_context"`
	}
	FullError struct {
		Code    codes.Code `json:"code"`
		Message string     `json:"message"`
	}
	Error string
}

// MockCSICalls is a Thread-safe storage for MockCSICall instances.
type MockCSICalls struct {
	calls []MockCSICall
	mutex sync.Mutex
}

// Get returns all currently recorded calls.
func (c *MockCSICalls) Get() []MockCSICall {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	return c.calls[:]
}

// Add appens one new call at the end.
func (c *MockCSICalls) Add(call MockCSICall) {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	c.calls = append(c.calls, call)
}

// LogGRPC takes individual parameters from the mock CSI driver and adds them.
func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) {
	// Encoding to JSON and decoding mirrors the traditional way of capturing calls.
	// Probably could be simplified now...
	logMessage := struct {
		Method   string
		Request  interface{}
		Response interface{}
		// Error as string, for backward compatibility.
		// "" on no error.
		Error string
		// Full error dump, to be able to parse out full gRPC error code and message separately in a test.
		FullError *spb.Status
	}{
		Method:   method,
		Request:  request,
		Response: reply,
	}

	if err != nil {
		logMessage.Error = err.Error()
		logMessage.FullError = grpcstatus.Convert(err).Proto()
	}

	msg, _ := json.Marshal(logMessage)
	call := MockCSICall{
		json: string(msg),
	}
	json.Unmarshal(msg, &call)

	klog.Infof("%s %s", grpcCallPrefix, string(msg))

	// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
	methodParts := strings.Split(call.Method, "/")
	call.Method = methodParts[len(methodParts)-1]

	c.Add(call)
}

var _ storageframework.TestDriver = &mockCSIDriver{}
var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{}
var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{}

// InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface
func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
	driverManifests := []string{
		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
		"test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
	}
	if driverOpts.Embedded {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml")
	} else {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml")
	}

	if driverOpts.RegisterDriver {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
	}

	if !driverOpts.DisableAttach {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml")
	}

	if driverOpts.EnableResizing {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml")
	}

	if driverOpts.EnableSnapshot {
		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-snapshotter.yaml")
	}

	return &mockCSIDriver{
		driverInfo: storageframework.DriverInfo{
			Name:        "csi-mock",
			FeatureTag:  "",
			MaxFileSize: storageframework.FileSizeMedium,
			SupportedFsType: sets.NewString(
				"", // Default fsType
			),
			Capabilities: map[storageframework.Capability]bool{
				storageframework.CapPersistence:  false,
				storageframework.CapFsGroup:      false,
				storageframework.CapExec:         false,
				storageframework.CapVolumeLimits: true,
			},
		},
		manifests:           driverManifests,
		podInfo:             driverOpts.PodInfo,
		storageCapacity:     driverOpts.StorageCapacity,
		enableTopology:      driverOpts.EnableTopology,
		attachable:          !driverOpts.DisableAttach,
		attachLimit:         driverOpts.AttachLimit,
		enableNodeExpansion: driverOpts.EnableNodeExpansion,
		tokenRequests:       driverOpts.TokenRequests,
		requiresRepublish:   driverOpts.RequiresRepublish,
		fsGroupPolicy:       driverOpts.FSGroupPolicy,
		embedded:            driverOpts.Embedded,
		hooks:               driverOpts.Hooks,
	}
}

func (m *mockCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
	return &m.driverInfo
}

func (m *mockCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
}

func (m *mockCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
	provisioner := config.GetUniqueDriverName()
	parameters := map[string]string{}
	ns := config.Framework.Namespace.Name

	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
}

func (m *mockCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
	snapshotter := m.driverInfo.Name + "-" + config.Framework.UniqueName
	ns := config.Framework.Namespace.Name

	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
}

func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
	m.clientSet = f.ClientSet

	// Create secondary namespace which will be used for creating driver
	m.driverNamespace = utils.CreateDriverNamespace(f)
	driverns := m.driverNamespace.Name
	testns := f.Namespace.Name

	if m.embedded {
		ginkgo.By("deploying csi mock proxy")
	} else {
		ginkgo.By("deploying csi mock driver")
	}
	cancelLogging := utils.StartPodLogs(f, m.driverNamespace)
	cs := f.ClientSet

	// pods should be scheduled on the node
	node, err := e2enode.GetRandomReadySchedulableNode(cs)
	framework.ExpectNoError(err)

	embeddedCleanup := func() {}
	containerArgs := []string{}
	if m.embedded {
		// Run embedded CSI driver.
		//
		// For now we start exactly one instance which implements controller,
		// node and identity services. It matches with the one pod that we run
		// inside the cluster. The name and namespace of that one is deterministic,
		// so we know what to connect to.
		//
		// Long-term we could also deploy one central controller and multiple
		// node instances, with knowledge about provisioned volumes shared in
		// this process.
		podname := "csi-mockplugin-0"
		containername := "mock"
		ctx, cancel := context.WithCancel(context.Background())
		serviceConfig := mockservice.Config{
			DisableAttach:         !m.attachable,
			DriverName:            "csi-mock-" + f.UniqueName,
			AttachLimit:           int64(m.attachLimit),
			NodeExpansionRequired: m.enableNodeExpansion,
			EnableTopology:        m.enableTopology,
			IO: proxy.PodDirIO{
				F:             f,
				Namespace:     m.driverNamespace.Name,
				PodName:       podname,
				ContainerName: "busybox",
			},
		}
		s := mockservice.New(serviceConfig)
		servers := &mockdriver.CSIDriverServers{
			Controller: s,
			Identity:   s,
			Node:       s,
		}
		m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers)

		l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(),
			proxy.Addr{
				Namespace:     m.driverNamespace.Name,
				PodName:       podname,
				ContainerName: containername,
				Port:          9000,
			},
		)
		framework.ExpectNoError(err, "start connecting to proxy pod")
		err = m.embeddedCSIDriver.Start(l, m.interceptGRPC)
		framework.ExpectNoError(err, "start mock driver")

		embeddedCleanup = func() {
			// Kill all goroutines and delete resources of the mock driver.
			m.embeddedCSIDriver.Stop()
			l.Close()
			cancel()
		}
	} else {
		// When using the mock driver inside the cluster it has to be reconfigured
		// via command line parameters.
		containerArgs = append(containerArgs, "--name=csi-mock-"+f.UniqueName)

		if !m.attachable {
			containerArgs = append(containerArgs, "--disable-attach")
		}

		if m.enableTopology {
			containerArgs = append(containerArgs, "--enable-topology")
		}

		if m.attachLimit > 0 {
			containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
		}

		if m.enableNodeExpansion {
			containerArgs = append(containerArgs, "--node-expand-required=true")
		}
	}

	config := &storageframework.PerTestConfig{
		Driver:              m,
		Prefix:              "mock",
		Framework:           f,
		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
		DriverNamespace:     m.driverNamespace,
	}

	o := utils.PatchCSIOptions{
		OldDriverName:            "csi-mock",
		NewDriverName:            "csi-mock-" + f.UniqueName,
		DriverContainerName:      "mock",
		DriverContainerArguments: containerArgs,
		ProvisionerContainerName: "csi-provisioner",
		NodeName:                 node.Name,
		PodInfo:                  m.podInfo,
		StorageCapacity:          m.storageCapacity,
		CanAttach:                &m.attachable,
		VolumeLifecycleModes: &[]storagev1.VolumeLifecycleMode{
			storagev1.VolumeLifecyclePersistent,
			storagev1.VolumeLifecycleEphemeral,
		},
		TokenRequests:     m.tokenRequests,
		RequiresRepublish: m.requiresRepublish,
		FSGroupPolicy:     m.fsGroupPolicy,
	}
	cleanup, err := utils.CreateFromManifests(f, m.driverNamespace, func(item interface{}) error {
		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
			return err
		}

		switch item := item.(type) {
		case *rbacv1.ClusterRole:
			if strings.HasPrefix(item.Name, "external-snapshotter-runner") {
				// Re-enable access to secrets for the snapshotter sidecar for
				// https://github.com/kubernetes/kubernetes/blob/6ede5ca95f78478fa627ecfea8136e0dff34436b/test/e2e/storage/csi_mock_volume.go#L1539-L1548
				// It was disabled in https://github.com/kubernetes-csi/external-snapshotter/blob/501cc505846c03ee665355132f2da0ce7d5d747d/deploy/kubernetes/csi-snapshotter/rbac-csi-snapshotter.yaml#L26-L32
				item.Rules = append(item.Rules, rbacv1.PolicyRule{
					APIGroups: []string{""},
					Resources: []string{"secrets"},
					Verbs:     []string{"get", "list"},
				})
			}
		}

		return nil
	}, m.manifests...)

	if err != nil {
		framework.Failf("deploying csi mock driver: %v", err)
	}

	driverCleanupFunc := generateDriverCleanupFunc(
		f,
		"mock",
		testns,
		driverns,
		cleanup,
		cancelLogging)

	cleanupFunc := func() {
		embeddedCleanup()
		driverCleanupFunc()
	}

	return config, cleanupFunc
}

func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	defer func() {
		// Always log the call and its final result,
		// regardless whether the result was from the real
		// implementation or a hook.
		m.calls.LogGRPC(info.FullMethod, req, resp, err)
	}()

	if m.hooks.Pre != nil {
		resp, err = m.hooks.Pre(ctx, info.FullMethod, req)
		if resp != nil || err != nil {
			return
		}
	}
	resp, err = handler(ctx, req)
	if m.hooks.Post != nil {
		resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err)
	}
	return
}

func (m *mockCSIDriver) GetCalls() ([]MockCSICall, error) {
	if m.embedded {
		return m.calls.Get(), nil
	}

	if m.driverNamespace == nil {
		return nil, errors.New("PrepareTest not called yet")
	}

	// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
	driverPodName := "csi-mockplugin-0"
	// Name of CSI driver container name
	driverContainerName := "mock"

	// Load logs of driver pod
	log, err := e2epod.GetPodLogs(m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName)
	if err != nil {
		return nil, fmt.Errorf("could not load CSI driver logs: %s", err)
	}

	logLines := strings.Split(log, "\n")
	var calls []MockCSICall
	for _, line := range logLines {
		index := strings.Index(line, grpcCallPrefix)
		if index == -1 {
			continue
		}
		line = line[index+len(grpcCallPrefix):]
		call := MockCSICall{
			json: string(line),
		}
		err := json.Unmarshal([]byte(line), &call)
		if err != nil {
			framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
			continue
		}

		// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
		methodParts := strings.Split(call.Method, "/")
		call.Method = methodParts[len(methodParts)-1]

		calls = append(calls, call)
	}
	return calls, nil
}

// gce-pd
type gcePDCSIDriver struct {
	driverInfo storageframework.DriverInfo
}

var _ storageframework.TestDriver = &gcePDCSIDriver{}
var _ storageframework.DynamicPVTestDriver = &gcePDCSIDriver{}
var _ storageframework.SnapshottableTestDriver = &gcePDCSIDriver{}

// InitGcePDCSIDriver returns gcePDCSIDriver that implements TestDriver interface
func InitGcePDCSIDriver() storageframework.TestDriver {
	return &gcePDCSIDriver{
		driverInfo: storageframework.DriverInfo{
			Name:        GCEPDCSIDriverName,
			FeatureTag:  "[Serial]",
			MaxFileSize: storageframework.FileSizeMedium,
			SupportedSizeRange: e2evolume.SizeRange{
				Min: "5Gi",
			},
			SupportedFsType: sets.NewString(
				"", // Default fsType
				"ext2",
				"ext3",
				"ext4",
				"xfs",
			),
			SupportedMountOption: sets.NewString("debug", "nouid32"),
			Capabilities: map[storageframework.Capability]bool{
				storageframework.CapPersistence: true,
				storageframework.CapBlock:       true,
				storageframework.CapFsGroup:     true,
				storageframework.CapExec:        true,
				storageframework.CapMultiPODs:   true,
				// GCE supports volume limits, but the test creates large
				// number of volumes and times out test suites.
				storageframework.CapVolumeLimits:        false,
				storageframework.CapTopology:            true,
				storageframework.CapControllerExpansion: true,
				storageframework.CapOnlineExpansion:     true,
				storageframework.CapNodeExpansion:       true,
				storageframework.CapSnapshotDataSource:  true,
			},
			RequiredAccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
			TopologyKeys:        []string{GCEPDCSIZoneTopologyKey},
			StressTestOptions: &storageframework.StressTestOptions{
				NumPods:     10,
				NumRestarts: 10,
			},
			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
				// GCE only allows for one snapshot per volume to be created at a time,
				// which can cause test timeouts. We reduce the likelihood of test timeouts
				// by increasing the number of pods (and volumes) and reducing the number
				// of snapshots per volume.
				NumPods:      20,
				NumSnapshots: 2,
			},
		},
	}
}

func (g *gcePDCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
	return &g.driverInfo
}

func (g *gcePDCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
	e2eskipper.SkipUnlessProviderIs("gce", "gke")
	if pattern.FsType == "xfs" {
		e2eskipper.SkipUnlessNodeOSDistroIs("ubuntu", "custom")
	}
	if pattern.FeatureTag == "[Feature:Windows]" {
		e2eskipper.Skipf("Skipping tests for windows since CSI does not support it yet")
	}
}

func (g *gcePDCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
	ns := config.Framework.Namespace.Name
	provisioner := g.driverInfo.Name

	parameters := map[string]string{"type": "pd-standard"}
	if fsType != "" {
		parameters["csi.storage.k8s.io/fstype"] = fsType
	}
	delayedBinding := storagev1.VolumeBindingWaitForFirstConsumer

	return storageframework.GetStorageClass(provisioner, parameters, &delayedBinding, ns)
}

func (g *gcePDCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
	snapshotter := g.driverInfo.Name
	ns := config.Framework.Namespace.Name

	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
}

func (g *gcePDCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
	testns := f.Namespace.Name
	cfg := &storageframework.PerTestConfig{
		Driver:    g,
		Prefix:    "gcepd",
		Framework: f,
	}

	if framework.ProviderIs("gke") {
		framework.Logf("The csi gce-pd driver is automatically installed in GKE. Skipping driver installation.")
		return cfg, func() {}
	}

	ginkgo.By("deploying csi gce-pd driver")
	// Create secondary namespace which will be used for creating driver
	driverNamespace := utils.CreateDriverNamespace(f)
	driverns := driverNamespace.Name

	cancelLogging := utils.StartPodLogs(f, driverNamespace)
	// It would be safer to rename the gcePD driver, but that
	// hasn't been done before either and attempts to do so now led to
	// errors during driver registration, therefore it is disabled
	// by passing a nil function below.
	//
	// These are the options which would have to be used:
	// o := utils.PatchCSIOptions{
	// 	OldDriverName:            g.driverInfo.Name,
	// 	NewDriverName:            storageframework.GetUniqueDriverName(g),
	// 	DriverContainerName:      "gce-driver",
	// 	ProvisionerContainerName: "csi-external-provisioner",
	// }
	createGCESecrets(f.ClientSet, driverns)

	manifests := []string{
		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/gce-pd/csi-controller-rbac.yaml",
		"test/e2e/testing-manifests/storage-csi/gce-pd/node_ds.yaml",
		"test/e2e/testing-manifests/storage-csi/gce-pd/controller_ss.yaml",
	}

	cleanup, err := utils.CreateFromManifests(f, driverNamespace, nil, manifests...)
	if err != nil {
		framework.Failf("deploying csi gce-pd driver: %v", err)
	}

	if err = WaitForCSIDriverRegistrationOnAllNodes(GCEPDCSIDriverName, f.ClientSet); err != nil {
		framework.Failf("waiting for csi driver node registration on: %v", err)
	}

	cleanupFunc := generateDriverCleanupFunc(
		f,
		"gce-pd",
		testns,
		driverns,
		cleanup,
		cancelLogging)

	return &storageframework.PerTestConfig{
		Driver:          g,
		Prefix:          "gcepd",
		Framework:       f,
		DriverNamespace: driverNamespace,
	}, cleanupFunc
}

// WaitForCSIDriverRegistrationOnAllNodes waits for the CSINode object to be updated
// with the given driver on all schedulable nodes.
func WaitForCSIDriverRegistrationOnAllNodes(driverName string, cs clientset.Interface) error {
	nodes, err := e2enode.GetReadySchedulableNodes(cs)
	if err != nil {
		return err
	}
	for _, node := range nodes.Items {
		if err := WaitForCSIDriverRegistrationOnNode(node.Name, driverName, cs); err != nil {
			return err
		}
	}
	return nil
}

// WaitForCSIDriverRegistrationOnNode waits for the CSINode object generated by the node-registrar on a certain node
func WaitForCSIDriverRegistrationOnNode(nodeName string, driverName string, cs clientset.Interface) error {
	framework.Logf("waiting for CSIDriver %v to register on node %v", driverName, nodeName)

	// About 8.6 minutes timeout
	backoff := wait.Backoff{
		Duration: 2 * time.Second,
		Factor:   1.5,
		Steps:    12,
	}

	waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
		csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
		if err != nil && !apierrors.IsNotFound(err) {
			return false, err
		}
		for _, driver := range csiNode.Spec.Drivers {
			if driver.Name == driverName {
				return true, nil
			}
		}
		return false, nil
	})
	if waitErr != nil {
		return fmt.Errorf("error waiting for CSI driver %s registration on node %s: %v", driverName, nodeName, waitErr)
	}
	return nil
}

func tryFunc(f func()) error {
	var err error
	if f == nil {
		return nil
	}
	defer func() {
		if recoverError := recover(); recoverError != nil {
			err = fmt.Errorf("%v", recoverError)
		}
	}()
	f()
	return err
}

func generateDriverCleanupFunc(
	f *framework.Framework,
	driverName, testns, driverns string,
	driverCleanup, cancelLogging func()) func() {

	cleanupHandle := new(framework.CleanupActionHandle)

	// Cleanup CSI driver and namespaces. This function needs to be idempotent and can be
	// concurrently called from defer (or AfterEach) and AfterSuite action hooks.
	cleanupFunc := func() {
		ginkgo.By(fmt.Sprintf("deleting the test namespace: %s", testns))
		// Delete the primary namespace but it's okay to fail here because this namespace will
		// also be deleted by framework.Aftereach hook
		tryFunc(func() { f.DeleteNamespace(testns) })

		ginkgo.By(fmt.Sprintf("uninstalling csi %s driver", driverName))
		tryFunc(driverCleanup)
		tryFunc(cancelLogging)

		ginkgo.By(fmt.Sprintf("deleting the driver namespace: %s", driverns))
		tryFunc(func() { f.DeleteNamespace(driverns) })
		// cleanup function has already ran and hence we don't need to run it again.
		// We do this as very last action because in-case defer(or AfterEach) races
		// with AfterSuite and test routine gets killed then this block still
		// runs in AfterSuite
		framework.RemoveCleanupAction(*cleanupHandle)
	}

	*cleanupHandle = framework.AddCleanupAction(cleanupFunc)
	return cleanupFunc
}
